feat:get_ranges#3925
Conversation
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Split the overlong first line into a short numpydoc summary plus an extended description. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
After the input split at the top of coalesced_get, merged groups only ever contain RangeByteRequest members. Replace the per-element isinstance filters (and the defensive ``else 0`` sort-key branch) with a single assertion at the top of the merged-group block and direct attribute access. Also remove the unreachable ``if total == 0: return`` guard (``indexed`` is non-empty by construction once we pass the earlier guard). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Exercise the ``kind == "missing"`` branch in the uncoalescable single-fetch arm for Offset/Suffix/None inputs, which was not hit by existing tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two related correctness issues in coalesced_get's drain loop: 1. When the consumer breaks out of the async-for (early exit), the generator's finally block only awaited in-flight tasks rather than cancelling them. That wasted I/O. Cancel first, then gather. 2. The drain loop waited on completion_queue for ``total`` entries, but after a "missing" or "error" we cancel pending tasks -- and cancelled tasks never enqueue a completion. With max_concurrency > 1 this could hang. Rework the drain loop to break out immediately on the first miss/error; the finally block handles cleanup. The new structure also collapses the redundant miss/error branches and removes the now-unused ``total``/``drained``/``stopped`` bookkeeping. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Exercises the concurrent path where a missing key is observed while other fetches are still in flight. Uses an asyncio.Event to gate late arrivals until after the miss has been processed, giving the drain loop an opportunity to observe and discard post-stop completions, and verifies the iterator terminates cleanly without hanging or raising. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drives many slow ranges with a small max_concurrency, breaks out of the async-for after the first yield, and verifies that at least one still-running fetch was cancelled rather than being left to run to completion. Cancellation is observed via a counter in the fetch's CancelledError branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
coalesced_get is implemented as an async generator (uses yield) and callers need access to aclose() to drive its finally block deterministically. Declaring the return type as AsyncGenerator instead of AsyncIterator exposes aclose()/asend()/athrow() through the type system, matches the runtime object, and lets consumers (e.g. the consumer-break test) avoid type-ignore escape hatches. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
pyproject asyncio_mode=auto already covers async test dispatch; the explicit pytestmark was a vestige. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Used 0000.feature.md as a placeholder; rename to {pr-number}.feature.md
once the PR is opened.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The SupportsGetRanges protocol is private; a user-facing release note shouldn't advertise it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
for context, we do already have a |
The min_deps CI job pins fsspec to 2023.10.0, which predates AsyncFileSystemWrapper. Wrapping a sync MemoryFileSystem fails there at fixture setup. Guard the affected tests with the same skipif pattern already used in test_fsspec.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3925 +/- ##
==========================================
+ Coverage 93.28% 93.30% +0.01%
==========================================
Files 87 88 +1
Lines 11745 11823 +78
==========================================
+ Hits 10956 11031 +75
- Misses 789 792 +3
🚀 New features to boost your workflow:
|
|
I think making it iterable adds complexity and adds confusion to whether request coalescing is expected to be applied here or not. If you have requests:
Then of course we should coalesce all the first 3 requests into one. But then the async iterable implies that we might want to use one of the first responses before the last one arrives... but the last request would've arrived first. Either, if you really want the response type to be async iterable, the responses should probably have an But I think it would be much simpler to take in a sequence of byte ranges and return a sequence of results. Just like object-store/obstore/obspec. |
In the design in this PR we are iterating over IO calls the reader actually did, which is less than or equal to the number of byte ranges requested. So, assuming the first three are fused, we would either see: or Which, for sharding is useful -- you can start decoding chunks immediately while you wait for the rest of the sub-chunks to come in. Does that make sense? or am I misunderstanding something. |
maxrjones
left a comment
There was a problem hiding this comment.
Nice, I think this is well-implemented and tested. I have one correctness issue and a few comments.
While I don't think this is the responsibility of this PR, I still want to register my general dissatisfaction with our API design that this PR inherits and extends, in particular with mixing ABC and protocol-based abstraction mechanisms. The issue here is that the new methods are not available in wrapper stores such as LoggingStore. Let's continue to work on a better architecture for the storage API.
| # Launch all work as tasks. The semaphore bounds actual I/O concurrency. | ||
| tasks: set[asyncio.Task[None]] = set() | ||
| for group in groups: | ||
| tasks.add(asyncio.create_task(_fetch_group(ctx, group))) |
There was a problem hiding this comment.
| tasks.add(asyncio.create_task(_fetch_group(ctx, group))) | |
| # A one-member "group" is a RangeByteRequest that did not merge with a | |
| # neighbor; route it through `_fetch_single` so it skips the redundant | |
| # slice-by-zero in `_fetch_group`. | |
| if len(group) == 1: | |
| idx, single = group[0] | |
| tasks.add(asyncio.create_task(_fetch_single(ctx, idx, single))) | |
| else: | |
| tasks.add(asyncio.create_task(_fetch_group(ctx, group)))``` |
There was a problem hiding this comment.
@maxrjones, that's a bit of a micro-optimization, but if you want to make that logic change, I suggest pushing that into _fetch_group directly, where it will check the length of the list, and defer to _fetch_single for a singleton list, leaving the code here cleaner.
| self.fs = fs | ||
| self.path = path | ||
| self.allowed_exceptions = allowed_exceptions | ||
| self.coalesce_options = coalesce_options |
There was a problem hiding this comment.
| self.coalesce_options = coalesce_options | |
| # Copy so per-instance mutation of `self.coalesce_options` does not | |
| # leak into the module-level `DEFAULT_COALESCE_OPTIONS` singleton | |
| # (or into other stores constructed without an explicit kwarg). | |
| self.coalesce_options: CoalesceOptions = ( | |
| DEFAULT_COALESCE_OPTIONS.copy() if coalesce_options is None else coalesce_options.copy() | |
| )``` |
There was a problem hiding this comment.
coalesce_options should be immutable (or at least typed as such), so that any erroneous attempt to mutate them should fail (either during static analysis or at runtime), so this should be unnecessary.
To aid in this effort, I suggest considering a frozen dataclass rather than a TypedDict for CoalesceOptions.
There was a problem hiding this comment.
To aid in this effort, I suggest considering a frozen dataclass rather than a TypedDict for CoalesceOptions.
we can annotate the fields as ReadOnly
There was a problem hiding this comment.
Could do, but in this case, I would lean towards a kw-only, frozen dataclass for the following reasons:
- no real motivation for using a dict here (we're not marshaling between JSON)
- we can set defaults for every field, thus eliminating the need for a separate default instance (simply using
CoalesceOptions()as the param default would do the trick) - allows specifying only what we want to override rather than having to specify everything (e.g., passing
CoalesceOptions(max_concurrency=20)will use the default values for the other fields). Of course, we could mark the TypedDict as total=False, but that would require a bit more code to fill in the default values for the user (not much, but a bit less convenient than using a dataclass, IMO).
There was a problem hiding this comment.
I've added a suggestion to the CoalesceOptions definition.
| # src/zarr/storage/_protocols.py | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING, Protocol, runtime_checkable | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import AsyncIterator, Sequence | ||
|
|
||
| from zarr.abc.store import ByteRequest | ||
| from zarr.core.buffer import Buffer, BufferPrototype | ||
|
|
||
|
|
||
| @runtime_checkable | ||
| class SupportsGetRanges(Protocol): | ||
| """Stores that satisfy this protocol can efficiently read many byte ranges | ||
| from a single key in a single call, typically via coalescing and concurrent fetch. | ||
|
|
||
| Private / unstable. Shape may change before being made public. | ||
| """ | ||
|
|
||
| def get_ranges( | ||
| self, | ||
| key: str, | ||
| byte_ranges: Sequence[ByteRequest | None], | ||
| *, | ||
| prototype: BufferPrototype, | ||
| ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: | ||
| """Read many byte ranges from `key`. | ||
|
|
||
| Each yield corresponds to one underlying I/O operation. | ||
|
|
||
| See `zarr.core._coalesce.coalesced_get` for full semantics. | ||
| """ | ||
| ... |
There was a problem hiding this comment.
we should either put this in zarr/abc/store.py with the other protocols or more the other protocols here. It's confusing to have this separate from similar protocols.
| """Stores that satisfy this protocol can efficiently read many byte ranges | ||
| from a single key in a single call, typically via coalescing and concurrent fetch. | ||
|
|
||
| Private / unstable. Shape may change before being made public. |
There was a problem hiding this comment.
given that this is intended to be private/unstable, should the methods be prefixed with a _ and the addition not included in the changelog as a feature release?
Co-authored-by: Max Jones <14077947+maxrjones@users.noreply.github.com>
I don't mind adding this method to the |
| self.fs = fs | ||
| self.path = path | ||
| self.allowed_exceptions = allowed_exceptions | ||
| self.coalesce_options = coalesce_options |
There was a problem hiding this comment.
I've added a suggestion to the CoalesceOptions definition.
| DEFAULT_COALESCE_OPTIONS: CoalesceOptions = { | ||
| "max_gap_bytes": 1 << 20, # 1 MiB | ||
| "max_coalesced_bytes": 16 << 20, # 16 MiB | ||
| "max_concurrency": 10, |
There was a problem hiding this comment.
| DEFAULT_COALESCE_OPTIONS: CoalesceOptions = { | |
| "max_gap_bytes": 1 << 20, # 1 MiB | |
| "max_coalesced_bytes": 16 << 20, # 16 MiB | |
| "max_concurrency": 10, |
| # Launch all work as tasks. The semaphore bounds actual I/O concurrency. | ||
| tasks: set[asyncio.Task[None]] = set() | ||
| for group in groups: | ||
| tasks.add(asyncio.create_task(_fetch_group(ctx, group))) |
There was a problem hiding this comment.
@maxrjones, that's a bit of a micro-optimization, but if you want to make that logic change, I suggest pushing that into _fetch_group directly, where it will check the length of the list, and defer to _fetch_single for a singleton list, leaving the code here cleaner.
…ileNotFoundError when get yields None
| max_gap_bytes: int | None = None, | ||
| max_coalesced_bytes: int | None = None, |
There was a problem hiding this comment.
Why are you not specifying the defaults here? Doing so would make it obvious in and IDE what the defaults are, without having to go look for them. It also allows you to simplify the function by eliminating the is None checks.
There was a problem hiding this comment.
I think that's a good idea. In order to preserve the None sentinel behavior in an ergonomic way, we need to bring back a typeddict that models these kwargs so callers can construct such a dict and omit any keys that should take the default value.
There was a problem hiding this comment.
I don't think a typeddict is necessary. I'm fine with the individual kwargs, but I would prefer to eliminate the Nones and defaulting to the default values explicitly.
The problem with a typeddict in this case is that it makes it more awkward_ to deal with default values, or at least more verbose.
There was a problem hiding this comment.
the nones are gone, and the defaults are set in just the store get_ranges method. This is less convenient for callers of the internal routines, but IMO balances convenience for people calling the store method with avoiding redundant default parameters at different levels of detail. no need for typeddicts
| max_concurrency: int | None = None, | ||
| max_gap_bytes: int | None = None, | ||
| max_coalesced_bytes: int | None = None, |
There was a problem hiding this comment.
Again, I suggest using the defaults. I realize it may get a bit redundant to repeat the defaults everywhere, but doing so is more pleasant for the user experience.
|
@maxrjones and @chuckwondo thanks for the great feedback, I rolled a lot of your points into a series of changes, summarized in this bulleted list:
...I see chunk has more suggestions, so the above list might already be stale xD |
Co-authored-by: Chuck Daniels <cjdaniels4@gmail.com>
Co-authored-by: Chuck Daniels <cjdaniels4@gmail.com>
Co-authored-by: Chuck Daniels <cjdaniels4@gmail.com>
…into feat/get-many
…into feat/get-many
chuckwondo
left a comment
There was a problem hiding this comment.
We're getting very close!
| max_concurrency: int = 10, | ||
| max_gap_bytes: int = 1 << 20, | ||
| max_coalesced_bytes: int = 16 << 20, |
There was a problem hiding this comment.
Nice. I think this makes good sense. Perhaps just add some comments for clarity:
| max_concurrency: int = 10, | |
| max_gap_bytes: int = 1 << 20, | |
| max_coalesced_bytes: int = 16 << 20, | |
| max_concurrency: int = 10, | |
| max_gap_bytes: int = 1 << 20, # 1 MiB | |
| max_coalesced_bytes: int = 16 << 20, # 16 MiB |
| max_concurrency: int, | ||
| max_gap_bytes: int, | ||
| max_coalesced_bytes: int, | ||
| ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: |
There was a problem hiding this comment.
So we don't have to cast in order to invoke aclose():
| ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: | |
| ) -> AsyncGenerator[Sequence[tuple[int, Buffer | None]]]: |
| # Unwrap: prefer GeneratorExit, then a single inner exception, otherwise raise group. | ||
| for exc in eg.exceptions: | ||
| if isinstance(exc, GeneratorExit): | ||
| raise exc from None | ||
| if len(eg.exceptions) == 1: | ||
| raise eg.exceptions[0] from None | ||
| raise |
There was a problem hiding this comment.
We do NOT want to reraise GeneratorExit:
| # Unwrap: prefer GeneratorExit, then a single inner exception, otherwise raise group. | |
| for exc in eg.exceptions: | |
| if isinstance(exc, GeneratorExit): | |
| raise exc from None | |
| if len(eg.exceptions) == 1: | |
| raise eg.exceptions[0] from None | |
| raise | |
| # Unwrap: prefer a single inner exception, otherwise raise group. | |
| if subgroup := eg.subgroup(lambda e: not isinstance(e, GeneratorExit)): | |
| e = subgroup.exceptions[0] if len(subgroup.exceptions) == 1 else subgroup | |
| raise e from None |
| # Explicitly close the generator so its finally block runs (cancelling | ||
| # in-flight tasks) before we make assertions. The narrow AsyncIterator | ||
| # return type does not expose `.aclose()`, but the runtime object is an | ||
| # async generator and supports it. | ||
| await cast("AsyncGenerator[Any, None]", agen).aclose() |
There was a problem hiding this comment.
| # Explicitly close the generator so its finally block runs (cancelling | |
| # in-flight tasks) before we make assertions. The narrow AsyncIterator | |
| # return type does not expose `.aclose()`, but the runtime object is an | |
| # async generator and supports it. | |
| await cast("AsyncGenerator[Any, None]", agen).aclose() | |
| # Explicitly close the generator so its finally block runs (cancelling | |
| # in-flight tasks) before we make assertions. | |
| await agen.aclose() |
| assert cancelled_calls >= 1 | ||
| assert completed_calls >= 1 |
There was a problem hiding this comment.
| assert cancelled_calls >= 1 | |
| assert completed_calls >= 1 | |
| assert completed_calls >= 1 | |
| assert cancelled_calls == len(ranges) - completed_calls |
this PR adds a
get_rangesprotocol for stores. the protocol defines the shape of a function that fetches multiple byte ranges within the same stored object. The purpose is to define a method stores can opt into if they offer an efficient way to fetch multiple byte ranges from the same object, which would be immediately useful for the sharding codec. The protocol looks like this:the return type is an async iterator over sequences, where each sequence is the result of an IO operation the store implemented. this provides some observability to the caller about the actual coalescing, if any, that occurred. Results are returned in computed order, so the inner result type is
tuple[int, Buffer | None], where theintis the index into the inputbyte_rangesfor that result.Only byte range requests that declare an explicit interval (
RangeByteRequest) are coalesced. Any other byte range, orNone, results in no coalescing and so the ranges will be fetched separately. I assume here that we do not care about coalescing overlapping suffix or prefix range requests, but we could add support for that if we need to.In addition to this protocol, there's a freestanding function that takes:
f(byte range) -> Awaitable[Buffer]function (which we would generate by combiningStore.getwithfunctools.partial)this function contains basic byte range coalescing logic, and it can be re-used for multiple stores. This is a non-abstract-base-class alternative to a default implementation on an abc.
That freestanding function is used to implement
get_rangeson theFsspecStore. This is probably not useful for local- or memory-backed storage, but is useful for remote storage. The actual implementation is lightweight:cc @aldenks, the idea here is to build a basis for your range coalescing work for the sharding codec
cc @kylebarron, would love your feedback on this design.
related issues/PRs:
#1758
#3004